package com.kelai.jms.listenner.adapter;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.kelai.jms.RedisMessageChannelEnum;
import com.kelai.jms.handel.RedisQueueMessageDelegateHandle;
import com.kelai.jms.message.RedisQueueMessage;
import com.kelai.utils.ContextUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 异步消息适配器
 * Created by Luffy on 2017/08/17.
 */
public class RedisQueueMessageHandleAdapter<REQ extends RedisQueueMessage, RES> implements InitializingBean, DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueueMessageHandleAdapter.class);

    protected RedisTemplate<String, REQ> redisTemplate;

    private Environment environment;

    private RedisQueueMessageDelegateHandle<REQ, RES> delegateHandle;

    private ThreadPoolTaskExecutor taskExecutor;

    /**
     * 内部线程狗
     */
    private Thread dog = null;

    /**
     * 指示线程是否在运行
     */
    private boolean isRunning = false;

    public void setRedisTemplate(RedisTemplate<String, REQ> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void setDelegateHandle(RedisQueueMessageDelegateHandle<REQ, RES> delegateHandle) {
        this.delegateHandle = delegateHandle;
    }

    public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public Environment getEnvironment() {
        return environment;
    }

    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    // private int cap = Short.MAX_VALUE;// 最大阻塞的容量，超过容量将会导致清空旧数据
    private byte[] rawKey;
    private RedisConnectionFactory factory;
    private RedisConnection connection;// for blocking
    private BoundListOperations<String, REQ> listOperations;// noblocking

    private Lock lock = new ReentrantLock();// 基于底层IO阻塞考虑

    private ExecutorService threadPool;

    private boolean isClosed = false;


    @Override
    @SuppressWarnings("unchecked")
    public void afterPropertiesSet() {
        Integer poolSize = environment.getProperty("thread.pool.redis.store.init", Integer.class, 200);
        Integer maxPoolSize = environment.getProperty("thread.pool.redis.store.max", Integer.class, 2000);
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, 1,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(),
                new ThreadFactoryBuilder().setNameFormat("device-data-pool-%d").build(),
                (r, executor) -> LOGGER.error("reject device update"));

        factory = redisTemplate.getConnectionFactory();
        connection = RedisConnectionUtils.getConnection(factory);
        RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
        RedisMessageChannelEnum channelEnum = delegateHandle.getChannelEnum();
        rawKey = redisSerializer.serialize(channelEnum.channel());
        listOperations = redisTemplate.boundListOps(channelEnum.channel());
//        listenerThread = new ListenerThread();
//        listenerThread.setDaemon(true);
//        listenerThread.start();

        startScanThread();
//        LOGGER.info("~~~~~~~~消耗消息线程池监听程序启动...~~~~~~~~~~~~");
//        for (int index = 0; index < 200; index++) {
//            Runnable exeThread = new Runnable() {
//                public void run() {
//                    while (!isClosed) {
//                        try {
//                            REQ req = takeFrom(3);// cast exceptionyou should check.
//                            if (req != null) {
//                                delegateHandle.handleMessage(req);
//                            }
//
//                        } catch (Exception e) {
//                            resetConnection();
//                        } finally {
//                            selfSleep(500);
//                        }
//                    }
//                }
//            };
//            threadPool.submit(exeThread);
//        }

        // taskExecutor.execute(() -> {
        // while (!isClosed) {
        // try {
        // REQ req = takeFrom(0);// cast exceptionyou should check.
        // taskExecutor.execute(() -> {
        // // 逐个执行
        // if (req != null) {
        // delegateHandle.handleMessage(req);
        // }
        // });
        // } catch (Exception ex) {
        // try {
        // RedisConnectionUtils.releaseConnection(connection, factory);
        // } finally {
        // connection = RedisConnectionUtils.getConnection(factory);
        // }
        // } finally {
        // selfSleep(1000);
        // }
        // }
        // }) ;
    }

    private void startScanThread() {
        if (this.isRunning) {
            LOGGER.warn("线程已经存在,正在重启...");
            stopScanThread();
        }
        LOGGER.info("~~~~~~~~消耗消息线程池监听程序启动...~~~~~~~~~~~~");
        this.isRunning = true;
        this.dog = new Thread() {
            @Override
            public void run() {
                onStart();
            }
        };
        this.dog.setDaemon(true);
        this.dog.start();
        LOGGER.warn("start the thread dog===================");
    }

    private void stopScanThread() {
        this.isRunning = false;
        // 待1秒钟，让线程自动停止
        for (int i = 0; i < 100 && this.dog != null; i++) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
        Thread t = this.dog;
        if (this.dog != null) {
            t.interrupt();
        }
        this.dog = null;
    }

    private void onStart() {
        //在redis没有关闭的情况下，每1s中去取一次
        while (!isClosed) {
            if(ContextUtil.current==null){
                LOGGER.info("current is null, wait...."+new Date());
                selfSleep(1000*30);
                continue;
            }
            //由于takefrom是阻塞的，在没有值得情况下会阻塞3s
            REQ a = takeFrom(3);
            if (a != null) {
                LOGGER.info("find a one:" + a.getId());
                threadPool.execute(() -> delegateHandle.handleMessage(a));
            }
            selfSleep(100);
            if (isClosed) {
                break;
            }
        }
        this.dog = null;
    }

    @Override
    public void destroy() throws Exception {
        if (isClosed) {
            return;
        }
        isClosed = true;
        shutdown();
    }

    public void sendMessage(REQ req) {
        listOperations.rightPush(req);
        if (listOperations.size() > 0) {
            LOGGER.info("RightPush剩余待执行消息总数({}): {},now:{}", delegateHandle.getChannelEnum().channel(), listOperations.size(), req.getId());
        }
    }

    public void sendPriorMessage(REQ req) {
        listOperations.leftPush(req);
        if (listOperations.size() > 0) {
            LOGGER.info("LeftPush剩余待执行消息总数({}): {}", delegateHandle.getChannelEnum().channel(), listOperations.size());
        }
    }

    /**
     * 从队列的头，插入
     */
    // public void pushFromHead(REQ value) {
    // listOperations.leftPush(value);
    // }
    //
    // public void pushFromTail(REQ value) {
    // listOperations.rightPush(value);
    // }

    /**
     * noblocking
     *
     * @return null if no item in queue
     */
    // public REQ removeFromHead() {
    // return listOperations.leftPop();
    // }
    //
    // public REQ removeFromTail() {
    // return listOperations.rightPop();
    // }

    /**
     * blocking
     * remove and get first item from queue:BLPOP
     *
     * @return
     */
    @SuppressWarnings("unchecked")
    private REQ takeFrom(int timeout) {
        try {
            lock.lockInterruptibly();
            if (listOperations.size() > 0) {
                List<byte[]> results = connection.bLPop(timeout, rawKey);
                if (CollectionUtils.isEmpty(results)) {
                    return null;
                }
                LOGGER.info("剩余待执行消息总数({}): {}", delegateHandle.getChannelEnum().channel(), listOperations.size());
                return (REQ) redisTemplate.getValueSerializer().deserialize(results.get(1));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }

    @SuppressWarnings("unchecked")
    private String takeFromStr(int timeout) throws InterruptedException, UnsupportedEncodingException {
        try {
            lock.lockInterruptibly();
            List<byte[]> results = connection.bLPop(timeout, rawKey);
            if (CollectionUtils.isEmpty(results)) {
                return null;
            }
            String req = new String(results.get(1), "UTF-8");
            LOGGER.info("剩余待执行消息总数({}): {}", delegateHandle.getChannelEnum().channel(), listOperations.size());
            return req;
        } finally {
            lock.unlock();
        }
    }

    private void shutdown() {
        try {
            selfSleep(1000 * 10);
        } catch (Exception e) {
        }
    }

    private void selfSleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (Exception e) {
        }
    }

    /**
     * @return void
     * @Method reSetConnection
     * @Description 重置连接
     * @since 1.0.0
     */
    private void resetConnection() {
        try {
            RedisConnectionUtils.releaseConnection(connection, factory);
        } catch (Exception ex) {

        } finally {
            connection = null;
        }
        while (connection == null && !isClosed) {
            try {
                connection = RedisConnectionUtils.getConnection(factory);
                LOGGER.warn("Redis断线重连成功");
            } catch (Exception ex) {
                connection = null;
                LOGGER.warn("Redis断线重连失败!");
            } finally {
                selfSleep(1000);
            }
        }
    }
}
